/**
 * Copyright 2025 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { filter, map, mergeMap, Observable, Subject, tap, using } from 'rxjs';
import { WorkerConnector, WorkerConnectorUtil } from './connector';
import { KHIWorkerPacket } from 'src/app/worker/worker-types';

export interface IndexedResponse<Response> {
  index: number;
  response: Response;
}

export class ConnectorPool {
  constructor(private readonly connectors: WorkerConnector[]) {}

  /**
   * requestSeriesOfTasks generate an unscribable observable from the series of tasks.
   * Each task arguments given via requests are drafts of the actual request. The actual request is generated by the requestFinalizer just before sending it to the worker.
   */
  requestSeriesOfTasks<
    Request,
    FinalizedRequest extends KHIWorkerPacket,
    Response,
  >(
    requests: Request[],
    requestFinalizer: (request: Request) => Observable<FinalizedRequest>,
  ): Observable<IndexedResponse<Response>> {
    const availableWorkers = new Subject<WorkerConnector>();
    let currentRequestIndex = 0;
    let finishedCount = 0;
    return using(
      () => {
        setTimeout(() => {
          if (requests.length === 0) availableWorkers.complete();
          for (
            let i = 0;
            i < Math.min(requests.length, this.connectors.length);
            i++
          ) {
            availableWorkers.next(this.connectors[i]);
          }
        }, 0);
        return {
          unsubscribe: () => {
            if (!availableWorkers.closed) {
              availableWorkers.complete();
            }
          },
        };
      },
      () =>
        availableWorkers.pipe(
          filter(() => currentRequestIndex < requests.length),
          map((worker) => ({
            // allocate the emitted worker to the request with the index
            requestIndex: currentRequestIndex++,
            worker: worker,
          })),
          mergeMap(({ requestIndex, worker }) =>
            // prepare the request parameters
            requestFinalizer(requests[requestIndex]).pipe(
              map((finalizedRequest) => ({
                requestIndex: requestIndex,
                worker: worker,
                finalizedRequest: finalizedRequest,
              })),
            ),
          ),
          mergeMap(({ requestIndex, worker, finalizedRequest }) =>
            WorkerConnectorUtil.unary<Response>(worker, finalizedRequest).pipe(
              tap({
                next: () => {
                  finishedCount++;
                  if (!availableWorkers.closed) {
                    availableWorkers.next(worker);
                  }
                  if (
                    finishedCount === requests.length &&
                    !availableWorkers.closed
                  ) {
                    availableWorkers.complete();
                  }
                },
              }),
              map((response) => ({
                index: requestIndex,
                response: response,
              })),
            ),
          ),
        ),
    );
  }
}
